package com.lianda.connectors.sink;

import com.lianda.connectors.utils.ExecutionEnvUtil;
import com.lianda.connectors.utils.KafkaConfigUtil;
import com.lianda.model.MetricEvent;
import com.lianda.model.schemas.MetricSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

/**
 * 数据写入Kafka
 */
public class SinkToKafkaMain {
    public static void main(String[] args) throws Exception {
        final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
        StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
        DataStreamSource<MetricEvent> data = KafkaConfigUtil.buildSource(env);

        data.addSink(
                new FlinkKafkaProducer011(
                        parameterTool.get("kafka.sink.brokers"),
                        parameterTool.get("kafka.sink.topic"),
                        new MetricSchema())).name("flink-connectors-kafka")
                .setParallelism(parameterTool.getInt("stream.sink.parallelism"));

        env.execute("flink sink to kafka ");

    }
}
